DataTracks integration through FFI#66
Conversation
31fac42 to
e153e2e
Compare
ladvoc
left a comment
There was a problem hiding this comment.
Some initial comments, but I will make another pass. Looking clean so far!
00f046a to
b07035d
Compare
src/local_data_track.cpp
Outdated
|
|
||
| proto::FfiResponse resp = FfiClient::instance().sendRequest(req); | ||
| const auto &r = resp.local_data_track_try_push(); | ||
| return !r.has_error(); |
There was a problem hiding this comment.
question: Should we be exposing the error reason to the caller? There are several reasons why pushing a frame could fail (e.g., the track is unpublished, buffer full, etc.). For the ROS bridge, a boolean might be fine if there is logging, but for the core SDK, I lean towards we should report the error reason.
Related question: the FFI interface currently flattens all errors to strings for simplicty, however, on the Rust side there are proper error enums that describe the reason for the error. Should I expose these over FFI rather than using strings? Can an enum describing the error reason map nicely into a C++ exception?
There was a problem hiding this comment.
great question -- I need to sync with @xianshijing-lk on what the client facing applications should do. I think in general Livekit should never cause an application to crash (other than assert calls), so the sdk should catch any exceptions. Of course we still want to flow up the root cause (which we are currently doing). I could be way off here.
There was a problem hiding this comment.
for SDKs used in real-time systems, in general, exceptions are discouraged for "expected" failures like "buffer full" or "track unpublished".
I think the best practice here might be returning an result, which will include the success or error codes/Enums ?
There was a problem hiding this comment.
thanks for the guidance. I like the enum idea. I will implement something like:
Enum ErrorCodes
{
QUEUE_FULL,
...,
UNKOWN
}
I will also adopt this going forward when applicable!
There was a problem hiding this comment.
Note: on the Rust side, all fallible operations define error enums which enumerate all possible reasons for the failure. Currently, these are converted to strings when passed over FFI, but we can expose the actual enums if that's helpful.
There was a problem hiding this comment.
i went here with returning a templated Result which has the form:
template <typename T, typename E> class Result
An example is <std::shared_ptr<DataTrackSubscription>, DataTrackErrorCode> . I think this is a little complex for the current state but it should scale really well for new error codes! LMK what you think, we can simplify if need be.
There was a problem hiding this comment.
@ladvoc we can definelty support the enums! As of now we are kind of bastardizing them by setting them custom based on some checks. For example stuff like:
if (!handle_.valid()) {
return Result<void, DataTrackError>::failure(
DataTrackError{DataTrackErrorCode::INVALID_HANDLE,
"LocalDataTrack::tryPush: invalid FFI handle", false});
}
ladvoc
left a comment
There was a problem hiding this comment.
Some additional comments.
8647582 to
74e4b99
Compare
6413736 to
2b792db
Compare
73062e1 to
98658f7
Compare
…ame, assert(since that breaks cpp<->ffi<->rust agreement of rust handling buffering
f14faa5 to
f8f7685
Compare
src/local_data_track.cpp
Outdated
|
|
||
| proto::FfiResponse resp = FfiClient::instance().sendRequest(req); | ||
| const auto &r = resp.local_data_track_try_push(); | ||
| return !r.has_error(); |
There was a problem hiding this comment.
for SDKs used in real-time systems, in general, exceptions are discouraged for "expected" failures like "buffer full" or "track unpublished".
I think the best practice here might be returning an result, which will include the success or error codes/Enums ?
| * // process frame.payload | ||
| * } | ||
| */ | ||
| class DataTrackSubscription { |
There was a problem hiding this comment.
maybe I asked this question before, I wonder if should be called data_track_stream ?
or remote_data_track_stream ?
so it will be aligned with audio_stream / video_stream
There was a problem hiding this comment.
i had originally named as data_track_subscription because:
a. data tracks follow a slightly different format than video/audio
b. conceptually it was easier for me to understand.
However, i think your point is especially valid because
a. only data track publish and unpublish are different
b. consistency is important.
Will update once all other comments are resolved to avoid confusing referencing of MR comments!
There was a problem hiding this comment.
I would push for keeping it as subscription since there are semantic differences between the two:
- AudioStream and VideoStream for are purely local frame taps. Dropping one removes a local sink, but the SFU subscription is unaffected.
- DataTrackSubscription controls the subscription itself; the first subscribe() call triggers SFU signaling, and dropping the last subscription for a track unsubscribes from the SFU. It's not a passive reader, but rather managing a resource.
There was a problem hiding this comment.
if the trade off is align with cpp or align with other SDKs, aligning with other SDKs is more future proofing.
src/data_track_subscription.cpp
Outdated
|
|
||
| const auto &dts = event.data_track_subscription_event(); | ||
| if (dts.subscription_handle() != | ||
| static_cast<std::uint64_t>(subscription_handle_.get())) { |
There was a problem hiding this comment.
this worries me a bit, onFfiEvent is a callback. If DataTrackSubscription is moved to a new memory location, the lambda captured this.
take a step back, why do we need to support the move operator ? do we see real use cases from it ?
There was a problem hiding this comment.
This is a really good catch.
Since the public API provides a shared_ptr, i dont think we do need to provide a move constructor. the pointer itself can be std::move(), not the object itself.
Side note: I wonder how we can setup some tests to try to catch edge cases like this.
src/data_track_subscription.cpp
Outdated
| auto *msg = req.mutable_data_track_subscription_read(); | ||
| msg->set_subscription_handle( | ||
| static_cast<uint64_t>(subscription_handle_.get())); | ||
| FfiClient::instance().sendRequest(req); |
There was a problem hiding this comment.
I think there are some races there.
like one thread calls this read(), and pass this section under std::lock_guardstd::mutex lock(mutex_); and get to this section.
meanwhile, close() is being called, and reset the subscription_handle_
There was a problem hiding this comment.
Nice thank you -- with the addition of this request call to the data tracks API i missed putting this in the lock!
There was a problem hiding this comment.
I also added better logic to close() which:
- gets the lock
- sets closed_ = true
- resets the subscription handle
- sets listener_id_ = 0
- releases the lock
- calls ffi RemoveListener()
There was a problem hiding this comment.
I guess as long as you release the log in the middle of the read function, and close() function reset the subscription_handle_, you will still run into the race here where the invalid subscription_handle_ is being used
There was a problem hiding this comment.
maybe it is better to double check it with the AI ?
There was a problem hiding this comment.
i think the race is gated by checks of if (closed_) return; before subscription_handle_ is used (except for init).
I did ask Codex GPT 5.4 though and it said:
In the current read() path, the handle is read and used while mutex_ is still held in src/data_track_subscription.cpp (line 39). Specifically, read():
- takes mutex_ at src/data_track_subscription.cpp (line 41)
- checks closed_ / eof_
- copies subscription_handle_.get() to a local at src/data_track_subscription.cpp (line 49)
- sends the FFI read request at src/data_track_subscription.cpp (line 58)
include/livekit/local_data_track.h
Outdated
| * @return true on success, false if the push failed (e.g. back-pressure | ||
| * or the track has been unpublished). | ||
| */ | ||
| bool tryPush(const std::uint8_t *data, std::size_t size, |
There was a problem hiding this comment.
curiously, do we need all these 3 overloaded methods ? or we should push for one instead?
There was a problem hiding this comment.
great point, and we have four tryPush() calls after adding
bool tryPush(std::vector<std::uint8_t> &&payload,
std::optional<std::uint64_t> user_timestamp = std::nullopt);
I think this is too much. My opinion here would be to keep the efficient tryPush() call which uses std::move(), and keep:
bool tryPush(const DataFrame &frame);
I can forsee bool tryPush(const DataFrame &frame); being used to directly forward data from one room to another.
There was a problem hiding this comment.
I agree, we should anticipate data track frames having more metadata fields in the future, and we likely won't want overloads for all of them. Also, small nitpick: in the other SDKs DataFrame is called DataTrackFrame.
There was a problem hiding this comment.
will rename this to DataTrackFrame to align with other SDKs once other comments are resolved for easier tracing.
Sorry, I think I missed some details and have some more comments to address before landing this PR.
…d callback semantics and the public API returns a shared_ptr<>. Clean up race logic in read() by making the subscription read req while locked. close(): reset subscription handle, listener_id_, and closed_ under lock. Then call RemoveListener() ouside lock
227c9a5 to
15bbecc
Compare
Overview
Integrate DataTracks into the CPP SDK through the FFI
Testing
Examples
examples/ping_pongBlocked By
Rust SDKs PR
Resolves BOT-268